AWS SDK for JavaScript v3 の DynamoDB Document Client を使って paginate scan をしてみた

AWS SDK for JavaScript v3 の DynamoDB Document Client を使って paginate scan をしてみた

Clock Icon2024.2.18

こんにちは、CX 事業本部製造ビジネステクノロジー部の若槻です。

AWS SDK for JavaScript v3 では、DynamoDB Client を DynamoDB Document Client でラップすることにより、通常(ネイティブの JavaScript 型)の JSON 形式でパラメーターに指定したり、レスポンスを受け取ることができます。

lib-dynamodb の README はこちらです。

今回は、この DynamoDB Document Client を使って paginate scan をしてみました。

試してみた

サンプルデータ

テーブルに投入するサンプルデータとして、1 列目がパーティションキー(id)、2 列目がソートキー(timestamp)の CSV ファイルを作成します。

d001,1700625273
d001,1699658818
d001,1703858878
d001,1681316462
d001,1695108297
d001,1694674832
d001,1680945699
d001,1701799579
d001,1696271173
d001,1685651084
d002,1706301230
d002,1679314750
d002,1701457171
d002,1685919651
d002,1684091128

リソース作成(AWS CDK)

必要なリソースの作成は、AWS CDK(TypeScript)を使って行います。またテーブルへの paginate scan は Lambda 関数上で実行します。

import {
  aws_lambda,
  aws_logs,
  aws_lambda_nodejs,
  aws_dynamodb,
  aws_s3,
  aws_s3_deployment,
  Stack,
  RemovalPolicy,
  CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    // サンプルデータアップロード用の S3 バケット
    const bucket = new aws_s3.Bucket(this, 'Bucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // サンプルデータのアップロード
    new aws_s3_deployment.BucketDeployment(this, 'DeploySampleTableData', {
      sources: [aws_s3_deployment.Source.asset('./src/tableData')],
      destinationBucket: bucket,
    });

    // DynamoDB テーブル
    const sampleTable = new aws_dynamodb.Table(this, 'SampleTable', {
      partitionKey: { name: 'id', type: aws_dynamodb.AttributeType.STRING },
      sortKey: { name: 'timestamp', type: aws_dynamodb.AttributeType.NUMBER },
      billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST,
      removalPolicy: RemovalPolicy.DESTROY,
      importSource: {
        inputFormat: aws_dynamodb.InputFormat.csv({
          delimiter: ',',
          headerList: ['id', 'timestamp'],
        }),
        bucket,
      },
    });

    // Lambda 関数
    const sampleFunc = new aws_lambda_nodejs.NodejsFunction(
      this,
      'SampleFunc',
      {
        architecture: aws_lambda.Architecture.ARM_64,
        runtime: aws_lambda.Runtime.NODEJS_20_X,
        logGroup: new aws_logs.LogGroup(this, 'SampleFuncLogGroup', {
          removalPolicy: RemovalPolicy.DESTROY,
        }),
        environment: {
          SAMPLE_TABLE_NAME: sampleTable.tableName,
        },
      }
    );

    // Lambda 関数に DynamoDB テーブルへのアクセス権限を付与
    sampleTable.grantReadData(sampleFunc);

    // Lambda 関数名の出力
    new CfnOutput(this, 'SampleFuncName', {
      value: sampleFunc.functionName,
    });
  }
}

Lambda コード

テーブルへの paginate scan を行う Lambda 関数のコードです。paginateScan() では非同期なイテラブルが返されるため、for await...of によりページごとのデータを順番(直列)に行います。

import { paginateScan, DynamoDBDocument } from '@aws-sdk/lib-dynamodb';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';

const SAMPLE_TABLE_NAME = process.env.SAMPLE_TABLE_NAME || '';

interface DataItem {
  id: string;
  timestamp: number;
}

const ddbDocClient = DynamoDBDocument.from(
  new DynamoDBClient({
    region: 'ap-northeast-1',
    apiVersion: '2012-08-10',
  })
);

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
    },
    {
      TableName: SAMPLE_TABLE_NAME,
    }
  );
  const items: DataItem[] = [];

  for await (const page of paginator) {
    console.log(page.Count);
    items.push(...(page.Items as DataItem[]));
  }

  console.log(items);
};

Lambda 関数を含むリソースを CDK でデプロイして、関数を実行します。

# CKD デプロイ
npx cdk deploy --require-approval never --method=direct

# Lambda 関数実行
aws lambda invoke --function-name ${sampleFuncName} outputfile.txt

実行結果のログです。パーティションキーごとに、ソートキーによる昇順で取得されています。また取得データは通常の JSON 形式となっています。

15

[
  { id: 'd001', timestamp: 1680945699 },
  { id: 'd001', timestamp: 1681316462 },
  { id: 'd001', timestamp: 1685651084 },
  { id: 'd001', timestamp: 1694674832 },
  { id: 'd001', timestamp: 1695108297 },
  { id: 'd001', timestamp: 1696271173 },
  { id: 'd001', timestamp: 1699658818 },
  { id: 'd001', timestamp: 1700625273 },
  { id: 'd001', timestamp: 1701799579 },
  { id: 'd001', timestamp: 1703858878 },
  { id: 'd002', timestamp: 1679314750 },
  { id: 'd002', timestamp: 1684091128 },
  { id: 'd002', timestamp: 1685919651 },
  { id: 'd002', timestamp: 1701457171 },
  { id: 'd002', timestamp: 1706301230 }
]

ProjectionExpression を指定

ProjectionExpression を指定して、レスポンスデータの属性を絞り込みます。

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
    },
    {
      TableName: SAMPLE_TABLE_NAME,
      ProjectionExpression: 'id', // 指定
    }
  );
  const items: DataItem[] = [];

  for await (const page of paginator) {
    console.log(page.Count);

    items.push(...(page.Items as DataItem[]));
  }

  console.log(items);
};

Lambda 関数をデプロイして実行すると、データの属性が id に絞り込まれて返されました。

15

[
  { id: 'd001' }, { id: 'd001' },
  { id: 'd001' }, { id: 'd001' },
  { id: 'd001' }, { id: 'd001' },
  { id: 'd001' }, { id: 'd001' },
  { id: 'd001' }, { id: 'd001' },
  { id: 'd002' }, { id: 'd002' },
  { id: 'd002' }, { id: 'd002' },
  { id: 'd002' }
]

FilterExpression を指定

FilterExpression を指定して、条件に合致するデータのみを取得します。client-dynamodb との違いとして、ExpressionAttributeValues では通常の JSON 形式を指定します。

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
    },
    {
      TableName: SAMPLE_TABLE_NAME,
      ExpressionAttributeNames: {
        '#timestamp': 'timestamp',
      },
      ExpressionAttributeValues: {
        ':timestamp': 1700000000,
      },
      FilterExpression: '#timestamp >= :timestamp',
    }
  );
  const items: DataItem[] = [];

  for await (const page of paginator) {
    console.log(page.Count);

    items.push(...(page.Items as DataItem[]));
  }

  console.log(items);
};

Lambda 関数をデプロイして実行すると、timestamp1700000000 以上のデータのみが取得されました。

5

[
  { id: 'd001', timestamp: 1700625273 },
  { id: 'd001', timestamp: 1701799579 },
  { id: 'd001', timestamp: 1703858878 },
  { id: 'd002', timestamp: 1701457171 },
  { id: 'd002', timestamp: 1706301230 }
]

pageSize を指定

pageSize を指定して、ページごとのデータ取得数を制限します。

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
      pageSize: 3, // 指定
    },
    {
      TableName: SAMPLE_TABLE_NAME,
    }
  );
  const items: DataItem[] = [];

  for await (const page of paginator) {
    console.log(page.Count);
    items.push(...(page.Items as DataItem[]));
  }

  console.log(items);
};

Lambda 関数をデプロイして実行すると、ページごとに 3 件ずつのデータが取得されました。

3
3
3
3
3
0

[
  { id: 'd001', timestamp: 1680945699 },
  { id: 'd001', timestamp: 1681316462 },
  { id: 'd001', timestamp: 1685651084 },
  { id: 'd001', timestamp: 1694674832 },
  { id: 'd001', timestamp: 1695108297 },
  { id: 'd001', timestamp: 1696271173 },
  { id: 'd001', timestamp: 1699658818 },
  { id: 'd001', timestamp: 1700625273 },
  { id: 'd001', timestamp: 1701799579 },
  { id: 'd001', timestamp: 1703858878 },
  { id: 'd002', timestamp: 1679314750 },
  { id: 'd002', timestamp: 1684091128 },
  { id: 'd002', timestamp: 1685919651 },
  { id: 'd002', timestamp: 1701457171 },
  { id: 'd002', timestamp: 1706301230 }
]

Limit を指定(効果なし)

ここで、ページネーション処理においては、 Limit を指定して取得するデータ数を制限できるのでしょうか?

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
    },
    {
      TableName: SAMPLE_TABLE_NAME,
      Limit: 5, // 指定
    }
  );
  const items: DataItem[] = [];

  for await (const page of paginator) {
    console.log(page.Count);
    items.push(...(page.Items as DataItem[]));
  }

  console.log(items);
};

Lambda 関数をデプロイして実行すると、ページごとに 15 件のデータが取得されました。TypeScript の型定義では指定可能であるにもかかわらず、Limit の指定は効果が無いようです。

15

[
  { id: 'd001', timestamp: 1680945699 },
  { id: 'd001', timestamp: 1681316462 },
  { id: 'd001', timestamp: 1685651084 },
  { id: 'd001', timestamp: 1694674832 },
  { id: 'd001', timestamp: 1695108297 },
  { id: 'd001', timestamp: 1696271173 },
  { id: 'd001', timestamp: 1699658818 },
  { id: 'd001', timestamp: 1700625273 },
  { id: 'd001', timestamp: 1701799579 },
  { id: 'd001', timestamp: 1703858878 },
  { id: 'd002', timestamp: 1679314750 },
  { id: 'd002', timestamp: 1684091128 },
  { id: 'd002', timestamp: 1685919651 },
  { id: 'd002', timestamp: 1701457171 },
  { id: 'd002', timestamp: 1706301230 }
]

ページごとの取得処理を並列化したい場合

for await...of 内でデータの取得まで行う場合は、直列で処理が行われるためデータ数が多い場合は取得に時間を要します。並列で取得する場合は下記のように Promise を使ってページごとのデータ取得処理を並列化することができます。

並列取得により処理が高速化される動作とならなかっため確認中。

export const handler = async (): Promise<void> => {
  const paginator = paginateScan(
    {
      client: ddbDocClient,
      pageSize: 1,
    },
    {
      TableName: SAMPLE_TABLE_NAME,
    }
  );

  const pagePromises: Promise<DataItem[]>[] = [];

  for await (const page of paginator) {
    const pagePromise = new Promise<DataItem[]>((resolve) => {
      resolve(page.Items as DataItem[]);
    });

    pagePromises.push(pagePromise);
  }

  const items = (await Promise.all(pagePromises)).flat();

  console.log(items);
};

おわりに

AWS SDK for JavaScript v3 の DynamoDB Document Client を使って paginate scan をしてみました。

記述が冗長となる client-dynamodb と比べて、lib-dynamodb では JSON 形式でデータを扱うことができるため、コードがシンプルになります。一部パラメーターが上手く動作しないことなどに気をつければ便利に利用することができます。

参考

以上

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.